package com.syjy.container;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 循环反复任务池
 * 会有线程查看线程任务执行时间 不允许有执行时间超过两秒的线程
 * 最小执行间隔时间为3秒 因为 检查任务线程的频率为2s
 *
 * @author: xiuwei
 * @version:
 */
@Slf4j
public class RecurringTaskContainer {

	/**
	 * 核心数量为10 排序数量为100 默认排序规则  多余线程存活时间为60秒的 定长线程
	 * 用于执行定时任务
	 */
	private static final ExecutorService calculatorThread = new ThreadPoolExecutor(10, 100,
			60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().build());

	/**
	 * 核心运行数量为4的定时任务池 用于拉起定时任务
	 */
	private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4, new ThreadFactoryBuilder().build());

	/**
	 * 定时任务对象  以时间间隔进行区分
	 */
	private Map<Integer, Map<Callable, String>> tasks = new ConcurrentHashMap();

	/**
	 * 单例
	 *
	 * @return 实例
	 */
	public static final RecurringTaskContainer getInstance() {
		return RecurringTaskContainer.LazyHolder.INSTANCE;
	}

	/**
	 * 向定时任务集合里添加定时任务
	 *
	 * @param interval 时间间隔
	 * @param task     任务
	 * @return 返回添加的任务
	 */
	public <T> Callable addRecurringCallable(Integer interval, String describe, Callable<T> task) {
		if (interval < 2) {
			throw new RuntimeException("该工具不允许执行3s以下间隔的任务");
		}
		log.info("循环任务池添加任务-{}", describe);
		if (getTasks().containsKey(interval)) {
			getTasks().get(interval).put(task, describe);
		} else {
			Map<Callable, String> map = new ConcurrentHashMap();
			map.put(task, describe);
			getTasks().put(interval, map);
			startScheduledTask(interval);
		}
		return task;
	}

	/**
	 * 向定时任务集合里添加定时任务
	 *
	 * @param interval 时间间隔
	 * @param task     任务
	 * @return 返回添加的任务
	 */
	public <T> Callable addRecurringTask(Integer interval, String describe, Task4Recurring<T> task) {
		if (interval < 2) {
			throw new RuntimeException("该工具不允许执行3s以下间隔的任务");
		}
		Callable<T> callable = () -> {
			T o;
			try {
				o = task.task();
			} catch (Exception e) {
				log.error("定时任务本身产生异常", e);
				throw e;
			}
			return o;
		};
		return addRecurringCallable(interval, describe, callable);
	}

	/**
	 * 移除定时任务
	 *
	 * @param interval 时间间隔
	 * @param task     执行的任务线程
	 */
	public void removeTask(Integer interval, Callable task) {
		if (this.getTasks().containsKey(interval)) {
			this.getTasks().get(interval).remove(task);
		}
	}

	private synchronized Map<Integer, Map<Callable, String>> getTasks() {
		return this.tasks;
	}

	/**
	 * 移除定时任务
	 *
	 * @param task 任务线程
	 */
	public void removeTask(Callable task) {
		for (Integer key : getTasks().keySet()) {
			if (this.getTasks().get(key).containsKey(task)) {
				log.info("循环任务池移除任务-{}", this.getTasks().get(key).get(task));
				this.getTasks().get(key).remove(task);
				break;
			}
		}
	}

	/**
	 * 开启定时任务线程 用于拉起具体的定时任务
	 *
	 * @param interval 时间间隔
	 */
	public void startScheduledTask(Integer interval) {
		ConcurrentHashMap futureMap=new ConcurrentHashMap();
		scheduledThreadPool.scheduleAtFixedRate(() -> {
			Map<Callable, String> map = getTasks().get(interval);
			Future future = null;
			for (Callable c : map.keySet()) {
				try {
					future = calculatorThread.submit(c);
					futureMap.put(future, map.get(c)+ DateTime.now().toString("HH:mm:ss:SSS"));
				} catch (Exception e) {
					if (future != null) {
						futureMap.remove(future);
					}
					log.error("执行循环任务时发生异常", e);
				}
				future = null;
			}
		}, 0, interval, TimeUnit.SECONDS);
		scheduledThreadPool.scheduleAtFixedRate(() -> {
			Iterator<Future> iter = futureMap.keySet().iterator();
			Future future;
			while (iter.hasNext()) {
				try {
					future = iter.next();
					if (!future.isDone()) {
						log.info("发现未完成的超时任务{}，取消该任务", futureMap.get(future));
						future.cancel(true);
					}
					if (future.isCancelled() ) {
						log.info("清除已取消的任务{}，删除该任务", futureMap.get(future));
						futureMap.remove(future);
						continue;
					}
					if (future.isDone() ) {
						// log.info("清除已完成的任务{}，删除该任务", futureMap.get(future));
						futureMap.remove(future);
						continue;
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}, interval-1, interval, TimeUnit.SECONDS);
	}

	/**
	 * 它的意义就是为了打印一下日志 就是为了包裹一层try catch
	 *
	 * @param <T>
	 */
	public interface Task4Recurring<T> {
		T task() throws Exception;
	}

	/**
	 * 为了单例的内部类
	 *
	 * @return 实例
	 */
	private static class LazyHolder {
		private static final RecurringTaskContainer INSTANCE = new RecurringTaskContainer();
	}
}
